home *** CD-ROM | disk | FTP | other *** search
/ Language/OS - Multiplatform Resource Library / LANGUAGE OS.iso / p4 / p4-1_2b.lha / p4-1.2b / lib / p4_tsr.c < prev    next >
C/C++ Source or Header  |  1993-02-06  |  13KB  |  519 lines

  1. #include "p4.h"
  2. #include "p4_sys.h"
  3.  
  4. /*
  5.  * search_p4_queue tries to locate a message of the desired type in the
  6.  * local queue of messages already received.  If it finds one, it dequeues it 
  7.  * if deq is true, and returns its address; otherwise it returns NULL.
  8.  */
  9. struct p4_msg *search_p4_queue(req_type, req_from, deq)
  10. int req_type, req_from;
  11. P4BOOL deq;
  12. {
  13.     struct p4_queued_msg *qpp, *qp;
  14.     struct p4_msg *tqp;
  15.     P4BOOL found;
  16.  
  17.     tqp = NULL;
  18.     qpp = NULL;
  19.     found = FALSE;
  20.     qp = p4_local->queued_messages->first_msg;
  21.  
  22.     while (qp && !found)
  23.     {
  24.     if (((qp->qmsg->type == req_type) || (req_type == -1)) &&
  25.         ((qp->qmsg->from == req_from) || (req_from == -1)))
  26.     {
  27.         found = TRUE;
  28.         if (deq)
  29.         {
  30.         if (p4_local->queued_messages->first_msg ==
  31.             p4_local->queued_messages->last_msg)
  32.         {
  33.             p4_local->queued_messages->first_msg = NULL;
  34.             p4_local->queued_messages->last_msg = NULL;
  35.         }
  36.         else
  37.         {
  38.             if (qp == p4_local->queued_messages->first_msg)
  39.             {
  40.             p4_local->queued_messages->first_msg = qp->next;
  41.             }
  42.             else if (qp == p4_local->queued_messages->last_msg)
  43.             {
  44.             p4_local->queued_messages->last_msg = qpp;
  45.             qpp->next = NULL;
  46.             }
  47.             else
  48.             {
  49.             qpp->next = qp->next;
  50.             }
  51.         }
  52.         }
  53.     }
  54.     else
  55.     {
  56.         qpp = qp;
  57.         qp = qp->next;
  58.     }
  59.     }
  60.     if (found)
  61.     {
  62.     p4_dprintfl(30,"extracted queued msg of type %d from %d\n",
  63.             qp->qmsg->type,qp->qmsg->from);
  64.     tqp = qp->qmsg;
  65.     if (deq)
  66.     {
  67.         free_quel(qp);
  68.     }
  69.     }
  70.     return (tqp);
  71. }
  72.  
  73. /*
  74.  * This is the top-level receive routine, called by the user.
  75.  *   req_type is either a desired type or -1.  In the -1 case it will be
  76.  *        modified  by p4_recv to indicate the type actually received.
  77.  *   req_from is either a desired source or -1.  In the -1 case it will be
  78.  *        modified by p4_recv to the source of the message actually received.
  79.  *   msg will be set by p4_recv to point to a buffer containing the message.
  80.  *   len_rcvd will be set by p4_recv to contain the length of the message.
  81.  *
  82.  *   returns 0 if successful; non-zero if error
  83.  */
  84. int p4_recv(req_type, req_from, msg, len_rcvd)
  85. int *req_type, *req_from, *len_rcvd;
  86. char **msg;
  87. {
  88.     struct p4_msg *tmsg;
  89.     P4BOOL good;
  90.  
  91.     p4_dprintfl(20, "receiving for type = %d, sender = %d\n",
  92.         *req_type, *req_from);
  93.     ALOG_LOG(p4_local->my_id,END_USER,0,"");
  94.     ALOG_LOG(p4_local->my_id,BEGIN_RECV,*req_from,"");
  95.  
  96.     if (!(tmsg = search_p4_queue(*req_type, *req_from, 1)))
  97.     {
  98.     for (good = FALSE; !good;)
  99.     {
  100.             ALOG_LOG(p4_local->my_id,END_RECV,0,"");
  101.             ALOG_LOG(p4_local->my_id,BEGIN_WAIT,0,"");
  102.             tmsg = recv_message(req_type, req_from);
  103.             ALOG_LOG(p4_local->my_id,END_WAIT,0,"");
  104.             ALOG_LOG(p4_local->my_id,BEGIN_RECV,0,"");
  105.         if (tmsg == NULL)
  106.         {
  107.         p4_dprintf("p4_recv: could not receive a message\n");
  108.         return (-1);
  109.         }
  110.         if (((tmsg->type == *req_type) || (*req_type == -1)) &&
  111.         ((tmsg->from == *req_from) || (*req_from == -1)))
  112.         {
  113.         good = TRUE;
  114.         }
  115.         else
  116.         {
  117.         queue_p4_message(tmsg, p4_local->queued_messages) ;
  118.         }
  119.     }
  120.     }
  121.  
  122.     if (tmsg->ack_req & P4_BROADCAST_MASK)
  123.     {
  124.     if (subtree_broadcast_p4(tmsg->type, tmsg->from,(char *) &tmsg->msg,
  125.                  tmsg->len, tmsg->data_type))
  126.     {
  127.         p4_dprintf("p4_recv: subtree_brdcst failed\n");
  128.         return -1;
  129.     }
  130.     tmsg->ack_req &= ~P4_BROADCAST_MASK;    /* Unset broadcast bit */
  131.     if (tmsg->from == p4_get_my_id())
  132.         free_p4_msg(tmsg);    /* Don't want broadcast from self */
  133.     }
  134.  
  135.     *req_type = tmsg->type;
  136.     *req_from = tmsg->from;
  137.     p4_dprintfl(10, "received type=%d, from=%d\n",*req_type,*req_from);
  138.     if (*msg == NULL)
  139.     {
  140.     *msg = (char *) &(tmsg->msg);
  141.     *len_rcvd = tmsg->len;
  142.     }
  143.     else
  144.     {
  145.     /* copy into the user's buffer area, truncating if necessary */
  146.     if (tmsg->len < *len_rcvd)
  147.         *len_rcvd = tmsg->len;
  148.     bcopy((char *) &(tmsg->msg),*msg,*len_rcvd);
  149.     free_p4_msg(tmsg);
  150.     }
  151.     ALOG_LOG(p4_local->my_id,END_RECV,*req_from,"");
  152.     ALOG_LOG(p4_local->my_id,BEGIN_USER,0,"");
  153.  
  154.     return (0);
  155. }
  156.  
  157. struct p4_msg *recv_message(req_type,req_from)
  158. int *req_type, *req_from;
  159. {
  160.     int rc, len;
  161.     struct p4_msg *tmsg;
  162.  
  163. #if  defined(CAN_DO_SOCKET_MSGS) && \
  164.     !defined(CAN_DO_SHMEM_MSGS)  && \
  165.     !defined(CAN_DO_CUBE_MSGS)   && \
  166.     !defined(CAN_DO_SWITCH_MSGS)
  167.  
  168.     return (socket_recv());
  169.  
  170. #else
  171.  
  172.     while (TRUE)
  173.     {
  174. #       if defined(CAN_DO_SHMEM_MSGS)
  175.     if (shmem_msgs_available())
  176.     {
  177.         return (shmem_recv());
  178.     }
  179. #       endif
  180.  
  181. #       if defined(CAN_DO_SOCKET_MSGS)
  182.     if (socket_msgs_available())
  183.     {
  184.         return (socket_recv());
  185.     }
  186. #       endif
  187.  
  188. #       if defined(CAN_DO_CUBE_MSGS)
  189.     if (MD_cube_msgs_available())
  190.         return (MD_cube_recv());
  191. #       endif
  192.  
  193. #       if defined(CAN_DO_SWITCH_MSGS)
  194.     if (p4_global->proctable[p4_local->my_id].switch_port != -1)
  195.     {
  196.         if (rc = sw_probe(req_from, p4_local->my_id, req_type, &len))
  197.         {
  198.         tmsg = alloc_p4_msg(len - sizeof(struct p4_msg) + sizeof(char *));
  199.         sw_recv(rc, tmsg);
  200.         p4_dprintfl(10, "p4_recv: received message from switch\n");
  201.         return (tmsg);
  202.         }
  203.     }
  204. #       endif
  205. #       if defined(CAN_DO_TCMP_MSGS)
  206.     if (MD_tcmp_msgs_available(req_type,req_from))
  207.     {
  208.         return (MD_tcmp_recv());
  209.     }
  210. #       endif
  211.     }
  212.  
  213. #endif
  214. }
  215.  
  216. P4BOOL p4_messages_available(req_type, req_from)
  217. int *req_type, *req_from;
  218. {
  219.     int found, len;
  220.     struct p4_msg *tmsg;
  221.  
  222.     ALOG_LOG(p4_local->my_id,END_USER,0,"");
  223.     ALOG_LOG(p4_local->my_id,BEGIN_WAIT,1,"");
  224.  
  225.     found = FALSE;
  226.     if (tmsg = search_p4_queue(*req_type, *req_from, 0))
  227.     {
  228.     found = TRUE;
  229.     *req_type = tmsg->type;
  230.     *req_from = tmsg->from;
  231.     }
  232.  
  233. #   if defined(CAN_DO_SHMEM_MSGS)
  234.     while (!found && shmem_msgs_available())
  235.     {
  236.     tmsg = shmem_recv();
  237.     if (((tmsg->type == *req_type) || (*req_type == -1)) &&
  238.         ((tmsg->from == *req_from) || (*req_from == -1)))
  239.     {
  240.         found = TRUE;
  241.         *req_type = tmsg->type;
  242.         *req_from = tmsg->from;
  243.     }
  244.     queue_p4_message(tmsg, p4_local->queued_messages);
  245.     }
  246. #   endif
  247.  
  248. #   if defined(CAN_DO_SOCKET_MSGS)
  249.     while (!found && socket_msgs_available())
  250.     {
  251.     tmsg = socket_recv();
  252.     if (((tmsg->type == *req_type) || (*req_type == -1)) &&
  253.         ((tmsg->from == *req_from) || (*req_from == -1)))
  254.     {
  255.         found = TRUE;
  256.         *req_type = tmsg->type;
  257.         *req_from = tmsg->from;
  258.     }
  259.     queue_p4_message(tmsg, p4_local->queued_messages);
  260.     }
  261. #   endif
  262.  
  263. #   if defined(CAN_DO_CUBE_MSGS)
  264.     while (!found && MD_cube_msgs_available())
  265.     {
  266.     tmsg = MD_cube_recv();
  267.     if (((tmsg->type == *req_type) || (*req_type == -1)) &&
  268.         ((tmsg->from == *req_from) || (*req_from == -1)))
  269.     {
  270.         found = TRUE;
  271.         *req_type = tmsg->type;
  272.         *req_from = tmsg->from;
  273.     }
  274.     queue_p4_message(tmsg, p4_local->queued_messages);
  275.     }
  276. #   endif
  277.  
  278.  
  279. #if defined(CAN_DO_SWITCH_MSGS)
  280.     if (!found && (p4_global->proctable[p4_local->my_id].switch_port != -1))
  281.     {
  282.     if (sw_probe(req_from, p4_local->my_id, req_type, &len))
  283.         found = TRUE;
  284.     }
  285. #endif
  286.  
  287. #if defined(CAN_DO_TCMP_MSGS)
  288.     if (!found && MD_tcmp_msgs_available(req_from,req_type))
  289.     found = TRUE;
  290. #endif
  291.  
  292.     ALOG_LOG(p4_local->my_id,END_WAIT,1,"");
  293.     ALOG_LOG(p4_local->my_id,BEGIN_USER,0,"");
  294.  
  295.     return (found);
  296. }                /* p4_messages_available */
  297.  
  298. P4VOID queue_p4_message(msg, hdr)
  299. struct p4_msg *msg;
  300. struct p4_msg_queue *hdr;
  301. {
  302.     struct p4_queued_msg *q;
  303.  
  304.     q = alloc_quel();
  305.     q->qmsg = msg;
  306.     q->next = NULL;
  307.  
  308.     if (hdr->first_msg == NULL)
  309.     {
  310.     hdr->first_msg = q;
  311.     }
  312.     else
  313.     {
  314.     hdr->last_msg->next = q;
  315.     }
  316.     hdr->last_msg = q;
  317.     p4_dprintfl(30,"queued type %d message for process %d quel=%d\n",
  318.         msg->type,msg->to,q);
  319. }
  320.  
  321.  
  322. int send_message(type, from, to, msg, len, data_type, ack_req, p4_buff_ind)
  323. char *msg;
  324. int type, to, len, data_type;
  325. P4BOOL ack_req, p4_buff_ind;
  326. {
  327.     struct p4_msg *tmsg;
  328.     int conntype = p4_local->conntab[to].type;
  329.  
  330.     p4_dprintfl(90, "send_message: to = %d, conntype=%d conntype=%s\n",
  331.         to, conntype, print_conn_type(conntype));
  332.  
  333.     switch (conntype)
  334.     {
  335.       case CONN_ME:
  336.     tmsg = get_tmsg(type,from,to,msg,len,data_type,ack_req,p4_buff_ind);
  337.     p4_dprintfl(20, "sending msg of type %d to myself\n",type);
  338.     queue_p4_message(tmsg, p4_local->queued_messages);
  339.     p4_dprintfl(10, "sent msg of type %d to myself\n",type);
  340.     break;
  341.  
  342. #ifdef CAN_DO_SHMEM_MSGS
  343.       case CONN_SHMEM:
  344.     tmsg = get_tmsg(type, from, to, msg, len, data_type, 
  345.                         ack_req, p4_buff_ind);
  346.     shmem_send(tmsg);
  347.     break;
  348. #endif
  349.  
  350. #ifdef CAN_DO_CUBE_MSGS
  351.       case CONN_CUBE:
  352.     tmsg = get_tmsg(type,from,to,msg,len,data_type,ack_req,p4_buff_ind);
  353.     MD_cube_send(tmsg);
  354.     if (!p4_buff_ind)
  355.         free_p4_msg(tmsg);
  356.     break;
  357. #endif
  358.  
  359. #ifdef CAN_DO_SOCKET_MSGS
  360.       case CONN_REMOTE_NON_EST:
  361.     if (establish_connection(to))
  362.     {
  363.         p4_dprintfl(90, "send_message: conn just estabd to %d\n", to);
  364.     }
  365.     else
  366.     {
  367.         p4_dprintf("send_message: unable to estab conn to %d\n", to);
  368.         return (-1);
  369.     }
  370.     /* no break; - just fall into connected code */
  371.       case CONN_REMOTE_EST:
  372.     if (data_type == P4NOX || p4_local->conntab[to].same_data_rep)
  373.     {
  374.         socket_send(type, from, to, msg, len, data_type, ack_req);
  375.     }
  376.     else
  377.     {
  378. #           ifdef CAN_DO_XDR
  379.         xdr_send(type, from, to, msg, len, data_type, ack_req);
  380. #           else
  381.         p4_error("cannot do xdr sends\n",0);
  382. #           endif
  383.     }
  384.     break;
  385. #endif
  386.  
  387. #if defined(CAN_DO_SWITCH_MSGS)
  388.       case CONN_REMOTE_SWITCH:
  389.     tmsg = get_tmsg(type,from,to,msg,len,data_type,ack_req,p4_buff_ind);
  390.     p4_dprintfl(20, "sending msg of type %d from %d to %d via switch_port %d\n",
  391.                 tmsg->type,tmsg->from,to,p4_local->conntab[tmsg->to].switch_port,tmsg);
  392.     sw_send(from, to,
  393.         p4_local->conntab[tmsg->to].switch_port, tmsg,
  394.         tmsg->len + sizeof(struct p4_msg) - sizeof(char *),
  395.         type);
  396.     p4_dprintfl(10, "sent msg of type %d from %d to %d via switch_port %d\n",
  397.                 tmsg->type,tmsg->from,to,p4_local->conntab[tmsg->to].switch_port,tmsg);
  398.     if (!p4_buff_ind)
  399.         free_p4_msg(tmsg);
  400.     break;
  401. #endif
  402.  
  403. #if defined(CAN_DO_TCMP_MSGS)
  404.       case CONN_TCMP:
  405.     tmsg = get_tmsg(type,from,to,msg,len,data_type,ack_req,p4_buff_ind);
  406.     p4_dprintfl(20, "sending msg of type %d to %d via tcmp\n",type,to);
  407.     MD_tcmp_send(type, from, to, tmsg, 
  408.              tmsg->len + sizeof(struct p4_msg) - sizeof(char *),
  409.              data_type, ack_req);
  410.     p4_dprintfl(10, "sent msg of type %d to %d via tcmp\n",type,to);
  411.     break;
  412. #endif
  413.  
  414.       case CONN_REMOTE_DYING:
  415.     p4_dprintfl(90, "send_message: proc %d is dying\n", to);
  416.     return (-1);
  417.  
  418.       default:
  419.     p4_dprintf("send_message: to=%d; invalid conn type=%d\n",to,conntype);
  420.     return (-1);
  421.     }
  422.  
  423.     return (0);
  424. }                /* send_message */
  425.  
  426. struct p4_msg *get_tmsg(type,from,to,msg,len,data_type,ack_req,p4_buff_ind)
  427. char *msg;
  428. int type, from, to, len, data_type, ack_req, p4_buff_ind;
  429. {
  430.     int i;
  431.     struct p4_msg *tmsg;
  432.  
  433.     if (p4_buff_ind)
  434.     {
  435.     tmsg = (struct p4_msg *) (msg - sizeof(struct p4_msg) + sizeof(char *));
  436.     }
  437.     else
  438.     {
  439.         tmsg = alloc_p4_msg(len);
  440.     if (tmsg == NULL)
  441.     {
  442.         p4_dprintf("OOPS! get_tmsg: could not alloc buff **\n");
  443.         return (NULL);
  444.     }
  445.     bcopy(msg, (char *) &(tmsg->msg), len);
  446.     }
  447.     tmsg->type = type;
  448.     tmsg->from = from;
  449.     tmsg->to = to;
  450.     tmsg->len = len;
  451.     tmsg->ack_req = ack_req;
  452.     tmsg->data_type = data_type;
  453.     return (tmsg);
  454. }
  455.  
  456.  
  457. char *p4_msg_alloc(msglen)
  458. int msglen;
  459. {
  460.     char *t;
  461.  
  462.     t = (char *) alloc_p4_msg(msglen);
  463.     ((struct p4_msg *) t)->msg_id = -1;    /* msg not in use by IPSC isend */
  464.     t = t + sizeof(struct p4_msg) - sizeof(char *);
  465.     return(t);
  466. }
  467.  
  468. P4VOID p4_msg_free(m)
  469. char *m;
  470. {
  471.     char *t;
  472.  
  473.     t = m - sizeof(struct p4_msg) + sizeof(char *);
  474.     ((struct p4_msg *) t)->msg_id = -1;    /* msg not in use by IPSC isend */
  475.     free_p4_msg((struct p4_msg *) t);
  476. }
  477.  
  478.  
  479. P4VOID initialize_msg_queue(mq)
  480. struct p4_msg_queue *mq;
  481. {
  482.     mq->first_msg = NULL;
  483.     mq->last_msg = NULL;
  484.     (P4VOID) p4_moninit(&(mq->m), 1);
  485.     p4_lock_init(&(mq->ack_lock));
  486.     p4_lock(&(mq->ack_lock));
  487. }
  488.  
  489.  
  490. struct p4_queued_msg *alloc_quel()
  491. {
  492.     struct p4_queued_msg *q;
  493.  
  494.     p4_lock(&p4_global->avail_quel_lock);
  495.     if (p4_global->avail_quel == NULL)
  496.     {
  497.     q = (struct p4_queued_msg *) p4_shmalloc(sizeof(struct p4_queued_msg));
  498.     if (!q)
  499.         p4_error("alloc_quel:  could not allocate queue element",
  500.              sizeof(struct p4_queued_msg));
  501.     }
  502.     else
  503.     {
  504.     q = p4_global->avail_quel;
  505.     p4_global->avail_quel = q->next;
  506.     }
  507.     p4_unlock(&p4_global->avail_quel_lock);
  508.     return (q);
  509. }
  510.  
  511. P4VOID free_quel(q)
  512. struct p4_queued_msg *q;
  513. {
  514.     p4_lock(&p4_global->avail_quel_lock);
  515.     q->next = p4_global->avail_quel;
  516.     p4_global->avail_quel = q;
  517.     p4_unlock(&p4_global->avail_quel_lock);
  518. }
  519.